4장 하둡 IO

1. 데이터 무결성

  • HDFS 는 데이터쓰기와 읽는 과정에서 체크섬을 계산하고 검증
  • io.bytes.per.checksum 에서 설정된 바이트마다 데이터에 대한 별도의 체크섬 생성
  • 체크섬의 크기는 512byte
  • CRC-32
  • 데이터노드 : 클라이언트에게 데이터 수신, 다른 데이터노드에게 데이터 복제과정에서 체크
  • 에러발생 \-> ChecksumException 발생
  • 클라이언트도 수신된 데이터에 체크섬 검증
  • 데이터노드는 체크섬 검증에 대한 로그 유지
  • DataBlockScanner : 주기적인 검증 (비트 로트 검증)
  • open() 메소드를 호출하기 전에 FileSystem.setVerifyChecksum() :검증여부 선택
  • 데이터 오류 발견시 : 네임노드에 보고 \-> 손상표시 \-> 새로운 복제본 다른 데이터노드에 할당 \-> 복제계수 회복 \-> 손상된 데이터노드 삭제
  • LocalFileSystem
    • 클라이언트 측 체크섬을 수행
    • 체크섬은 성능에 미치는 영향 미비
    • RawLocalFileSystem 으로 검증 불능화: fs.file.impl 속성을 org.apache.hadoop.fs.RawLocalFileSystem 값으로 설정 \-> file uri에 대한 구현을 변경
    • 내부적으로 ChecksumFileSystem 을 사용
  • ChecksumFileSystem
    • 에러 검출시 reportChecksumFailure()
    • LocalFileSystem 은 에러파일 + 체크섬을 bad_files 디렉토리로 이동시킴

2. 압축 및 코덱

  • 하둡에서 사용하는 압축 포맷
DEFLATE
gzip
ZIP
bzip2
LZO
  • 속도에 의한 압축비교
    LZO - gzip - ZIP - bzip2
  • 하둡 압축 코덱
DEFLATEorg.apache.hadoop.io.compress.DefaultCodec
gziporg.apache.hadoop.io.compress.GzipCodec
bzip2org.apache.hadoop.io.compress.BZip2Codec
LZOorg.apache.hadoop.io.compress.lzo.LzopCodec
  • 기본코드

압축기본코드


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;

import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;


public class StreamCompressor {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		String codecClassName = args[0];
		try {
			Class<?> codecClass = Class.forName(codecClassName);
			Configuration conf = new Configuration();
			CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
			
			CompressionOutputStream out = codec.createOutputStream(System.out);
			IOUtils.copyBytes(System.in, out, 4096, false);
			out.finish();
			
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}

}

3. 직렬화

  • 구조화된 객체를 바이트 스트림으로 전환하는 과정
  • 직렬화의 장점
    • 간결성
    • 고속화
    • 확장성
    • 상호운용성

(1) Writeable 인터페이스

  • write
  • readFields

(2) WritableComparable 인터페이스

  • Writable, Comparable 를 상속함

(3) RawComparator 인터페이스

  • Comparator 인터페이스를 상속
  • compare 메소드 기능
  • 스트림으로 읽힌 레코드를 구현자가 직접 비교

(4) WriableComparator

  • RawComparator의 일반적인 목적의 구현
  • 스트림으로부터 비교할 객체를 역직렬화
  • RawComparator 인스턴스를 구하기 위한 팩토리 기능

4. 파일 기반데이터 구조

(1) SequenceFile

  • 바이너리 키-값 쌍에 대한 영속적인 데이터 구조
  • SequenceFile.createWriter(FileSystem, Configuration, PATH, KEY TYPE, VALUE TYPE)
  • 생성코드

생성코드


import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;


public class SequenceFileWriteDemo {
	
	private static final String[] DATA ={
		"One, two, buckle my shoe",
		"Three, four, shut the door",
		"five, six, pick up sticks",
		"Seven, eight, lay them straight",
		"Nine, ten, a big fat gen"
	};
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		String uri = args[0];
		Configuration conf = new Configuration();
		SequenceFile.Writer writer = null;
		
		try {
			FileSystem fs = FileSystem.get(URI.create(uri), conf);
			Path path = new Path(uri);
			IntWritable key = new IntWritable();
			Text value = new Text();
			writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
			
			for(int i=0; i<100; i++){
				key.set(100 - i);
				value.set(DATA[i%DATA.length]);
				System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
				writer.append(key, value);
			}
			
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}finally{
			IOUtils.closeStream(writer);
		}
	}

}


  • 일반구조



  • 블록 압축구조


(2) MapFile

  • 키에 대한 색인과 함께 정렬된 SequenceFile
  • 키는 WritableComparable, 값은 Writable